Introduction
There are use cases where we would like to get the first or last of something within a group or particular grain.
It is natural to do something in SQL like:
select
col_1,
first(col_2) as first_something,
last(col_2) as first_something
from table
group by 1
order by 1
Which leads us to writing spark code like this df.orderBy().groupBy().agg(). This has unexpected behaviours in spark and can be different each run.
Library Imports
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F, Window
Create a SparkSession. No need to create SparkContext as you automatically get it as part of the SparkSession.
spark = (
SparkSession.builder
.master("local")
.appName("Exploring Joins")
.config("spark.some.config.option", "some-value")
.getOrCreate()
)
sc = spark.sparkContext
Initial Datasets
pets = spark.createDataFrame(
[
(1, 1, datetime(2018, 1, 1, 1 ,1, 1), 'Bear', 5),
(2, 1, datetime(2010, 1, 1, 1 ,1, 1), 'Chewie', 15),
(3, 1, datetime(2015, 1, 1, 1 ,1, 1), 'Roger', 10),
], ['id', 'breed_id', 'birthday', 'nickname', 'age']
)
pets.toPandas()
| id | breed_id | birthday | nickname | age | |
|---|---|---|---|---|---|
| 0 | 1 | 1 | 2018-01-01 01:01:01 | Bear | 5 |
| 1 | 2 | 1 | 2010-01-01 01:01:01 | Chewie | 15 |
| 2 | 3 | 1 | 2015-01-01 01:01:01 | Roger | 10 |
Option 1: Wrong Way
Result 1
df_1 = (
pets
.orderBy('birthday')
.groupBy('breed_id')
.agg(F.first('nickname').alias('first_breed'))
)
df_1.toPandas()
| breed_id | first_breed | |
|---|---|---|
| 0 | 1 | Chewie |
Result 2
df_2 = (
pets
.orderBy('birthday')
.groupBy('breed_id')
.agg(F.first('nickname').alias('first_breed'))
)
df_2.toPandas()
| breed_id | first_breed | |
|---|---|---|
| 0 | 1 | Chewie |
Option 2: Window Object, Right Way
window = Window.partitionBy('breed_id').orderBy('birthday')
df_3 = (
pets
.withColumn('first_breed', F.first('nickname').over(window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)))
.withColumn('rn', F.row_number().over(window.rowsBetween(Window.unboundedPreceding, Window.currentRow)))
)
df_3.toPandas()
| id | breed_id | birthday | nickname | age | first_breed | rn | |
|---|---|---|---|---|---|---|---|
| 0 | 2 | 1 | 2010-01-01 01:01:01 | Chewie | 15 | Chewie | 1 |
| 1 | 3 | 1 | 2015-01-01 01:01:01 | Roger | 10 | Chewie | 2 |
| 2 | 1 | 1 | 2018-01-01 01:01:01 | Bear | 5 | Chewie | 3 |
Summary
Ok so my example didn't work locally lol, but trust me it that orderBy() in a statement like this: orderBy().groupBy() doesn't maintain it's order!
reference: https://stackoverflow.com/a/50012355
For anything aggregation that needs an ordering performed (ie. first, last, etc.), we should avoid using groupby()s and instead we should use a window object.